Skip to content

KAFKA-20309: Limit SharePollEvent to single instance#21757

Merged
AndrewJSchofield merged 1 commit intoapache:trunkfrom
AndrewJSchofield:KAFKA-20309
Mar 16, 2026
Merged

KAFKA-20309: Limit SharePollEvent to single instance#21757
AndrewJSchofield merged 1 commit intoapache:trunkfrom
AndrewJSchofield:KAFKA-20309

Conversation

@AndrewJSchofield
Copy link
Member

@AndrewJSchofield AndrewJSchofield commented Mar 14, 2026

When a share consumer is initially joining a group but not yet fetching
records, the poll loop has no records to wait for. Each iteration of the
loop was creating an instance of SharePollEvent without considering
whether an in-flight event existed. As a result, many events could
briefly queue up for no good reason, only to be drained once the
consumer successfully joined the group.

The PR follows a similar design as used in the AsyncKafkaConsumer for
managing its poll event, without that code's extra complication of
validating the position.

Reviewers: TaiJuWu tjwu1217@gmail.com, Lianet Magrans
lmagrans@confluent.io

Copy link
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would only add that we should add tests for the new event generation behaviour, mainly to catch regressions (e.g., no new event created on poll if there is an in-flight).

I wanted to link you the the similar async consumer test and turns out we're missing the coverage there too :) I will file a jira now to add that coverage to the async. For share, we can add it here or in that same jira (the test goal is the same, impl should be very similar I expect). As you prefer.

--update
https://issues.apache.org/jira/browse/KAFKA-20315

shareMembershipManager.maybeReconcile(true));

requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
ShareMembershipManager membershipManager = hrm.membershipManager();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my understanding, is there a reason why we prefer to go through the HBReqMgr to get the shareMembershipManager here, instead of just calling onConsumerPoll on the shareMembershipManager we already had above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just followed the pattern with the other group types. I don't think there was a reason this was different.

@AndrewJSchofield AndrewJSchofield merged commit 2f2d9b0 into apache:trunk Mar 16, 2026
28 checks passed
@AndrewJSchofield AndrewJSchofield deleted the KAFKA-20309 branch March 16, 2026 14:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants